mmlsparkIn this tutorial, we perform the same classification task in two
diffeerent ways: once using plain pyspark and once using the
mmlspark library. The two methods yield the same performance,
but one of the two libraries is drastically simpler to use and iterate
on (can you guess which one?).
The task is simple: Predict whether a user's review of a book sold on Amazon is good (rating > 3) or bad based on the text of the review. We accomplish this by training LogisticRegression learners with different hyperparameters and choosing the best model.
In [ ]:
import pandas as pd
import mmlspark
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
dataFile = "BookReviewsFromAmazon10K.tsv"
textSchema = StructType([StructField("rating", IntegerType(), False),
StructField("text", StringType(), False)])
import os, urllib
if not os.path.isfile(dataFile):
urllib.request.urlretrieve("https://mmlspark.azureedge.net/datasets/"+dataFile, dataFile)
raw_data = spark.createDataFrame(pd.read_csv(dataFile, sep="\t", header=None), textSchema)
raw_data.show(5)
Real data however is more complex than the above dataset. It is common for a dataset to have features of multiple types: text, numeric, categorical. To illustrate how difficult it is to work with these datasets, we add two numerical features to the dataset: the word count of the review and the mean word length.
In [ ]:
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType, FloatType, DoubleType
def word_count(s):
return len(s.split())
def word_length(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
word_length_udf = udf(word_length, DoubleType())
word_count_udf = udf(word_count, IntegerType())
In [ ]:
data = raw_data \
.select("rating", "text",
word_count_udf("text").alias("wordCount"),
word_length_udf("text").alias("wordLength")) \
.withColumn("label", raw_data["rating"] > 3).drop("rating")
In [ ]:
data.show(5)
To choose the best LogisticRegression classifier using the pyspark
library, need to explictly perform the following steps:
train dataset
with different hyperparameterstest datasetvalidation setAs you can see below, there is a lot of work involved and a lot of steps where something can go wrong!
In [ ]:
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(inputCol="tokenizedText",
outputCol="TextFeatures",
numFeatures=numFeatures)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
feature_columns_array = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(
inputCols = feature_columns_array,
outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData \
.select("label", "features") \
.withColumn("label", assembledData.label.cast(IntegerType()))
In [ ]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [LogisticRegression(regParam = hyperParam)
for hyperParam in lrHyperParams]
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",
metricName="areaUnderROC")
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scored_data = model.transform(test)
metrics.append(evaluator.evaluate(scored_data))
best_metric = max(metrics)
best_model = models[metrics.index(best_metric)]
# Save model
best_model.write().overwrite().save("SparkMLExperiment.mmls")
# Get AUC on the validation dataset
scored_val = best_model.transform(validation)
print(evaluator.evaluate(scored_val))
Life is a lot simpler when using mmlspark!
The TrainClassifier Estimator featurizes the data internally,
as long as the columns selected in the train, test, validation
dataset represent the features
The FindBestModel Estimator find the best model from a pool of
trained models by find the model which performs best on the test
dataset given the specified metric
The CompueModelStatistics Transformer computes the different
metrics on a scored dataset (in our case, the validation dataset)
at the same time
In [ ]:
from mmlspark import TrainClassifier, FindBestModel, ComputeModelStatistics
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [LogisticRegression(regParam = hyperParam)
for hyperParam in lrHyperParams]
lrmodels = [TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Save model
bestModel.write().overwrite().save("MMLSExperiment.mmls")
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print("Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100))